package com.xxl.job.core.thread;

import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.executor.XxlJobExecutor;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
 * Job执行完成后，调用 调度中心完成回调
 * Created by xuxueli on 16/7/22.
 */
@Slf4j
public class TriggerCallbackThread {
    private static TriggerCallbackThread instance = new TriggerCallbackThread();
    public static TriggerCallbackThread getInstance(){
        return instance;
    }
    public static void pushCallBack(HandleCallbackParam handleCallbackParam) {
    	/**
    	 * 增加回调
    	 */
        getInstance().handleCallbackParamQueue.add(handleCallbackParam);
        log.info(">>>>>>>>>>> xxl-job, push callback request, logId:{}", handleCallbackParam.getLogId());
    }
    
    
    
    /**
     * job results callback queue
     */
    private BlockingQueue<HandleCallbackParam> handleCallbackParamQueue = new LinkedBlockingQueue<HandleCallbackParam>();
    
    
    private Thread triggerCallbackThread;
    private volatile boolean toStop = false;
    
    public void start() {
        // valid
        if (XxlJobExecutor.getAdminBizs() == null) {
            log.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
            return;
        }

        triggerCallbackThread = new Thread(new Runnable() {
            @Override
            public void run() {
                // normal callback
                while(!toStop){
                    try {
                        HandleCallbackParam handleCallbackParam = handleCallbackParamQueue.take();
                        if (handleCallbackParam != null) {
                            doCallback(handleCallbackParam);
                        }
                    } 
                    catch (Exception e) {
                        log.error(e.getMessage(), e);
                    }
                }
                
                
                log.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");
            }
        });
        
        triggerCallbackThread.setDaemon(true);
        triggerCallbackThread.start();
    }
    
    public void toStop(){
        toStop = true;
        // interrupt and wait
        triggerCallbackThread.interrupt();
        try {
            triggerCallbackThread.join();
        } 
        catch (InterruptedException e) {
            log.error(e.getMessage(), e);
        }
    }

    /**
     * do callback, will retry if error
     * @param callbackParamList
     */
    private void doCallback(HandleCallbackParam handleCallbackParam) {
    	
        // callback, will retry if error
        for (AdminBiz adminBiz : XxlJobExecutor.getAdminBizs()) {
            try {
                ReturnT<String> callbackResult = adminBiz.callback(handleCallbackParam);
                if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
                    callbackResult = ReturnT.SUCCESS;
                    log.info(">>>>>>>>>>> xxl-job callback success, handleCallbackParam:{}, callbackResult:{}",handleCallbackParam, callbackResult);
                    break;
                }
                else {
                    log.info(">>>>>>>>>>> xxl-job callback fail, handleCallbackParam:{}, callbackResult:{}", handleCallbackParam, callbackResult);
                }
            }
            catch (Exception e) {
                log.error(">>>>>>>>>>> xxl-job callback error, handleCallbackParam：{}",handleCallbackParam, e);
                pushCallBack(handleCallbackParam);
            }
        }
    }
    
}